Skip to main content

Followable Apps Documentation

Table of Contents

  1. What is a Followable App?
  2. When to Use Followable Apps
  3. Where to Use Followable Apps
  4. How to Use Followable Apps
  5. Examples
  6. Best Practices
  7. Troubleshooting

What is a Followable App?

A Followable App is a specialized type of Corva backend application that can be followed by other applications, creating a chain reaction of app executions. When a followable app produces data, it automatically triggers the execution of any apps that are configured to follow it.

Key Characteristics:

  • Data Producer: Followable apps must produce data to trigger following apps
  • Chain Reaction: They enable cascading workflows where one app's output becomes another app's input
  • App Type Restrictions: Only stream and scheduled apps can be made followable
  • Real-time Processing: They facilitate real-time data processing pipelines

Core Concept:

Followable App → Produces Data → Triggers Following Apps → Chain Reaction

The followable app architecture enables building complex data processing workflows where apps can depend on the output of other apps, creating sophisticated real-time analytics pipelines.


When to Use Followable Apps

Primary Use Cases:

  1. Multi-Stage Data Processing

    • When you need to process data through multiple computational stages
    • Each stage depends on the output of the previous stage
    • Example: Raw sensor data → Cleaned data → Analytics → Alerts
  2. Real-time Analytics Pipelines

    • When building complex analytics workflows
    • Multiple apps need to react to the same data events
    • Real-time calculations that depend on processed results
  3. Event-Driven Architectures

    • When you want to trigger multiple processes from a single data event
    • Decoupling data producers from data consumers
    • Building reactive systems that respond to data changes
  4. Data Enrichment Workflows

    • When raw data needs multiple enrichment steps
    • Each enrichment step can be handled by a specialized app
    • Final enriched data is used by downstream applications

Scenarios Where Followable Apps Excel:

  • Drilling Operations: Raw drilling data → Processed parameters → Alerts/Recommendations
  • Well Performance: Production data → Calculations → Performance metrics → Optimization suggestions
  • Equipment Monitoring: Sensor data → Health indicators → Predictive maintenance alerts
  • Quality Control: Raw measurements → Statistical analysis → Quality scores → Reports

Where to Use Followable Apps

Platform Context:

Followable apps operate within the Corva DevCenter ecosystem and are built using the Corva Python SDK.

Infrastructure Requirements:

  1. Corva DevCenter Account

    • Backend app development environment
    • Access to datasets and APIs
    • App deployment and management tools
  2. Data Sources

    • Real-time data streams (for stream apps)
    • Scheduled data intervals (for scheduled apps)
    • Proper data ingestion setup
  3. Datasets

    • Write permissions to target datasets
    • Proper dataset configuration for data storage
    • Understanding of data schema requirements

Architecture Placement:

Data Sources → Stream/Scheduled Apps (Followable) → Following Apps → End Users/Systems

Integration Points:

  • Upstream: Data ingestion systems, sensors, databases
  • Downstream: Visualization tools, alert systems, reporting applications
  • Lateral: Other Corva apps, external APIs, notification systems

How to Use Followable Apps

Step 1: Create a Followable App

Basic Structure:

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled

@scheduled
def followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# Process your data
processed_data = process_raw_data(event)

# Prepare data for storage and following apps
data = [
{
'asset_id': event.asset_id,
'version': 1,
'timestamp': processed_data['timestamp'],
'data': processed_data['results'],
}
]

# Store data and trigger following apps (recommended method)
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True # This flag enables following apps to be triggered
)

return "Data processed and published successfully"

Step 2: Configure the App as Followable

In the Corva DevCenter:

  1. Open your app page
  2. Navigate to Settings tab
  3. Activate "Followable App" toggle
  4. Select a dataset (must have write permissions)
  5. Save configuration

Step 3: Create Following Apps

Scheduled Following App:

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled

@scheduled
def following_scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# Query data from the followable app's dataset
data = api.get_dataset(
provider='your-provider',
dataset='followable-app-dataset',
query={
'asset_id': event.asset_id,
'company_id': event.company_id
},
sort={'timestamp': 1},
limit=100,
)

# Process the data
results = analyze_followable_data(data)

# Store results or trigger additional actions
return results

Stream Following App:

from corva import Api, Cache, StreamTimeEvent, stream

@stream
def following_stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
# Data is automatically provided in the event
for record in event.records:
# Process each data record from the followable app
process_followable_record(record.data)

return "Stream processing completed"

Step 4: Configure Following Relationships

In the Corva DevCenter when creating a following app:

  1. Choose same Segment and Log Type as the followable app
  2. Select the followable app from "Select the data that your app will follow"
  3. Complete app creation

Data Production Methods:

Method 1: Separate API Calls (Slower)

# First, insert data
api.insert_data(
provider='my-provider',
dataset='my-dataset',
data=data,
)

# Then, produce messages
api.produce_messages(data=data)
# Insert data and produce messages in one call
api.insert_data(
provider='my-provider',
dataset='my-dataset',
data=data,
produce=True # Enable message production
)

Examples

Example 1: Drilling Data Processing Pipeline

Followable App - Raw Data Processor:

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled

@scheduled
def drilling_data_processor(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
"""
Processes raw drilling data and calculates basic parameters
"""
# Get raw drilling data
raw_data = get_raw_drilling_data(event.asset_id, event.start_time, event.end_time)

processed_records = []
for record in raw_data:
# Calculate drilling parameters
processed_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record['timestamp'],
'data': {
'depth': record['depth'],
'rop': calculate_rop(record),
'wob': record['weight_on_bit'],
'rpm': record['rotary_speed'],
'torque': record['torque'],
'flow_rate': record['mud_flow_rate'],
'processed_at': int(time.time())
}
}
processed_records.append(processed_record)

# Store processed data and trigger following apps
api.insert_data(
provider='drilling-analytics',
dataset='processed-drilling-data',
data=processed_records,
produce=True
)

return f"Processed {len(processed_records)} drilling records"

Following App - Performance Analyzer:

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled

@scheduled
def drilling_performance_analyzer(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
"""
Analyzes processed drilling data for performance metrics
"""
# Get processed data from the followable app
processed_data = api.get_dataset(
provider='drilling-analytics',
dataset='processed-drilling-data',
query={
'asset_id': event.asset_id,
'company_id': event.company_id,
'timestamp': {'$gte': event.start_time, '$lte': event.end_time}
},
sort={'timestamp': 1}
)

if not processed_data:
return "No processed data available"

# Calculate performance metrics
performance_metrics = []
for record in processed_data:
data = record['data']

# Calculate drilling efficiency
efficiency_score = calculate_drilling_efficiency(
data['rop'], data['wob'], data['rpm'], data['torque']
)

# Detect anomalies
anomalies = detect_drilling_anomalies(data)

performance_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record['timestamp'],
'data': {
'efficiency_score': efficiency_score,
'anomalies': anomalies,
'recommendations': generate_recommendations(data, efficiency_score),
'analyzed_at': int(time.time())
}
}
performance_metrics.append(performance_record)

# Store performance analysis
api.insert_data(
provider='drilling-analytics',
dataset='drilling-performance',
data=performance_metrics,
produce=True # This could trigger alert systems
)

return f"Analyzed performance for {len(performance_metrics)} records"

Example 2: Real-time Well Monitoring

Followable App - Sensor Data Aggregator:

from corva import Api, Cache, StreamTimeEvent, stream

@stream
def sensor_data_aggregator(event: StreamTimeEvent, api: Api, cache: Cache):
"""
Aggregates and validates real-time sensor data
"""
aggregated_data = []

for record in event.records:
# Validate and clean sensor data
if validate_sensor_data(record.data):
aggregated_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record.timestamp,
'data': {
'pressure': record.data.get('pressure'),
'temperature': record.data.get('temperature'),
'flow_rate': record.data.get('flow_rate'),
'vibration': record.data.get('vibration'),
'quality_score': calculate_data_quality(record.data),
'aggregated_at': int(time.time())
}
}
aggregated_data.append(aggregated_record)

if aggregated_data:
# Store aggregated data and trigger following apps
api.insert_data(
provider='well-monitoring',
dataset='aggregated-sensor-data',
data=aggregated_data,
produce=True
)

return f"Aggregated {len(aggregated_data)} sensor records"

Following App - Alert System:

from corva import Api, Cache, StreamTimeEvent, stream

@stream
def well_alert_system(event: StreamTimeEvent, api: Api, cache: Cache):
"""
Monitors aggregated data for alert conditions
"""
alerts_generated = []

for record in event.records:
data = record.data

# Check for alert conditions
alerts = []

if data['pressure'] > PRESSURE_THRESHOLD:
alerts.append({
'type': 'HIGH_PRESSURE',
'severity': 'CRITICAL',
'message': f"Pressure {data['pressure']} exceeds threshold {PRESSURE_THRESHOLD}"
})

if data['temperature'] > TEMPERATURE_THRESHOLD:
alerts.append({
'type': 'HIGH_TEMPERATURE',
'severity': 'WARNING',
'message': f"Temperature {data['temperature']} exceeds threshold {TEMPERATURE_THRESHOLD}"
})

if data['quality_score'] < QUALITY_THRESHOLD:
alerts.append({
'type': 'POOR_DATA_QUALITY',
'severity': 'INFO',
'message': f"Data quality score {data['quality_score']} below threshold"
})

if alerts:
alert_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record.timestamp,
'data': {
'alerts': alerts,
'sensor_data': data,
'generated_at': int(time.time())
}
}
alerts_generated.append(alert_record)

if alerts_generated:
# Store alerts
api.insert_data(
provider='well-monitoring',
dataset='well-alerts',
data=alerts_generated,
produce=True # Could trigger notification systems
)

# Send immediate notifications for critical alerts
for alert_record in alerts_generated:
for alert in alert_record['data']['alerts']:
if alert['severity'] == 'CRITICAL':
send_immediate_notification(alert, event.asset_id)

return f"Generated {len(alerts_generated)} alert records"

Best Practices

1. Data Design

  • Consistent Schema: Ensure data structure consistency across followable and following apps
  • Versioning: Use version fields to handle schema evolution
  • Timestamps: Always include accurate timestamps for proper event ordering
  • Asset Context: Include asset_id and company_id for proper data isolation

2. Performance Optimization

  • Batch Processing: Process data in batches when possible
  • Efficient Queries: Use proper indexing and query optimization
  • Data Size Management: Avoid producing excessively large data payloads
  • Caching Strategy: Implement caching for frequently accessed data

3. Error Handling

@scheduled
def robust_followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
try:
# Main processing logic
data = process_data(event)

# Validate data before publishing
if validate_data(data):
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True
)
return "Success"
else:
raise ValueError("Data validation failed")

except Exception as e:
# Log error for debugging
api.log_error(f"Followable app error: {str(e)}")

# Optionally, publish error information
error_data = [{
'asset_id': event.asset_id,
'version': 1,
'timestamp': int(time.time()),
'data': {
'error': str(e),
'event_info': str(event)
}
}]

api.insert_data(
provider='your-provider',
dataset='error-log',
data=error_data
)

raise # Re-raise to ensure proper error handling

4. Monitoring and Logging

@scheduled
def monitored_followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
start_time = time.time()

try:
# Log app start
api.log_info(f"Starting followable app for asset {event.asset_id}")

# Process data
data = process_data(event)

# Log processing metrics
processing_time = time.time() - start_time
api.log_info(f"Processed {len(data)} records in {processing_time:.2f} seconds")

# Publish data
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True
)

# Log success
api.log_info(f"Successfully published data for {len(data)} records")

return f"Processed {len(data)} records"

except Exception as e:
# Log error with context
api.log_error(f"Followable app failed: {str(e)}", extra={
'asset_id': event.asset_id,
'event_type': type(event).__name__,
'processing_time': time.time() - start_time
})
raise

Troubleshooting

Common Issues and Solutions

1. Following Apps Not Triggering

Symptoms:

  • Followable app runs successfully
  • Data is stored in dataset
  • Following apps don't execute

Solutions:

# Ensure produce=True is set
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True # This is crucial!
)

# Or use separate produce_messages call
api.insert_data(provider='your-provider', dataset='your-dataset', data=data)
api.produce_messages(data=data)

Checklist:

  • produce=True flag is set in insert_data()
  • Following apps have correct segment and log type
  • Followable app configuration is properly saved
  • Dataset has correct write permissions

2. Data Schema Mismatches

Symptoms:

  • Following apps receive unexpected data structure
  • Processing errors in following apps

Solutions:

# Standardize data structure
def standardize_data_format(raw_data, asset_id):
return {
'asset_id': asset_id,
'version': 1, # Always include version
'timestamp': int(raw_data.get('timestamp', time.time())),
'data': {
# Your actual data here
'value1': raw_data.get('value1'),
'value2': raw_data.get('value2'),
# Include metadata
'source': 'followable_app_name',
'processed_at': int(time.time())
}
}

3. Performance Issues

Symptoms:

  • Slow app execution
  • Following apps timing out
  • Large data volumes causing issues

Solutions:

# Implement batching
def process_in_batches(data, batch_size=100):
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
yield batch

# Process and publish in batches
for batch in process_in_batches(large_dataset):
processed_batch = process_batch(batch)
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=processed_batch,
produce=True
)

4. Debugging Following Relationships

Debug Steps:

  1. Verify App Configuration:

    # In following app, log received data
    api.log_info(f"Received {len(event.records)} records from followable app")
    for record in event.records:
    api.log_debug(f"Record data: {record.data}")
  2. Check Dataset Contents:

    # Query the dataset directly
    stored_data = api.get_dataset(
    provider='your-provider',
    dataset='your-dataset',
    query={'asset_id': event.asset_id},
    sort={'timestamp': -1},
    limit=10
    )
    api.log_info(f"Found {len(stored_data)} records in dataset")
  3. Validate Event Flow:

    # Add comprehensive logging
    @stream
    def debug_following_app(event: StreamTimeEvent, api: Api, cache: Cache):
    api.log_info(f"Following app triggered with {len(event.records)} records")
    api.log_info(f"Event asset_id: {event.asset_id}")
    api.log_info(f"Event timestamp range: {event.start_time} to {event.end_time}")

    for i, record in enumerate(event.records):
    api.log_debug(f"Record {i}: {record.data}")

Error Messages and Solutions

Error MessageCauseSolution
"No following apps configured"App not marked as followableEnable followable toggle in DevCenter
"Dataset write permission denied"Insufficient permissionsCheck dataset permissions in settings
"Invalid data format"Malformed data structureValidate data schema before publishing
"Following app timeout"Processing takes too longOptimize following app logic, implement batching
"Circular dependency detected"App following creates loopReview app following relationships

Performance Monitoring

import time
from functools import wraps

def monitor_performance(func):
@wraps(func)
def wrapper(event, api, cache):
start_time = time.time()

try:
result = func(event, api, cache)
duration = time.time() - start_time

# Log performance metrics
api.log_info(f"{func.__name__} completed in {duration:.2f}s")

return result

except Exception as e:
duration = time.time() - start_time
api.log_error(f"{func.__name__} failed after {duration:.2f}s: {str(e)}")
raise

return wrapper

@monitor_performance
@scheduled
def monitored_followable_app(event, api, cache):
# Your app logic here
pass

Conclusion

Followable Apps are a powerful feature in the Corva platform that enable the creation of sophisticated, event-driven data processing pipelines. By understanding the concepts, best practices, and implementation patterns outlined in this documentation, you can build robust and scalable applications that leverage the full potential of the Corva ecosystem.

Key Takeaways:

  1. Chain Reactions: Followable apps create automated workflows where data flows seamlessly between applications
  2. Real-time Processing: Enable real-time analytics and immediate response to data events
  3. Modular Architecture: Break complex processing into specialized, maintainable components
  4. Scalable Design: Handle large volumes of data through efficient processing patterns

Next Steps:

  1. Review the Corva Python SDK documentation for additional details
  2. Explore the Corva DevCenter for platform-specific guidance
  3. Start with simple followable app examples before building complex workflows
  4. Implement comprehensive testing and monitoring for production deployments

For additional support and examples, visit the Corva DevCenter Documentation.